Skip to content

Add streaming-in-data implementations#31

Open
armaan-25 wants to merge 7 commits intomainfrom
streaming-in-data
Open

Add streaming-in-data implementations#31
armaan-25 wants to merge 7 commits intomainfrom
streaming-in-data

Conversation

@armaan-25
Copy link
Copy Markdown
Collaborator

Addresses #3

@Wenyueh Wenyueh marked this pull request as draft March 21, 2026 17:46
@Wenyueh
Copy link
Copy Markdown
Collaborator

Wenyueh commented Mar 21, 2026

always add PR description containing (1) what this PR does (2) how to test result (3) which issue it resolves

@armaan-25
Copy link
Copy Markdown
Collaborator Author

(1) What this PR does: It adds streaming brute-force model selection with convergence-based auto-stop, plus fake-data validation utilities and tests.
(2) How to test result: Run PYTHONPATH=src uv run --active python tests/streaming_brute_force_fake_data.py for end-to-end behavior and uv run --active pytest tests/test_streaming_brute_force.py -q for unit checks.
(3) Which issue it resolves: It resolves issue #3 by implementing and validating streaming incoming-data model selection starting from brute force.

@Wenyueh
Copy link
Copy Markdown
Collaborator

Wenyueh commented Mar 22, 2026

nice!

@armaan-25 armaan-25 changed the title Add streaming brute-force model selector Add streaming-in-data implementations Mar 24, 2026
@Wenyueh Wenyueh marked this pull request as ready for review March 24, 2026 13:52
Copilot AI review requested due to automatic review settings March 24, 2026 13:52
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds streaming-capable model selection variants intended for “online” scenarios where labeled data arrives in batches (Issue #3).

Changes:

  • Introduces StreamingRandomSearchModelSelector and StreamingBruteForceModelSelector with update()/update_one() and convergence tracking.
  • Exposes new selectors via agentopt.model_selection and top-level agentopt exports; adds method="streaming_random" to ModelSelector.
  • Adds tests and documentation updates for streaming random search (plus README example for streaming brute force).

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 15 comments.

Show a summary per file
File Description
src/agentopt/model_selection/streaming_random_search.py New streaming random-search selector with fixed sampled subset and incremental updates
src/agentopt/model_selection/streaming_brute_force.py New streaming brute-force selector intended to update cumulative metrics per batch
src/agentopt/model_selection/__init__.py Exports streaming selectors from the model_selection package
src/agentopt/__init__.py Exports streaming selectors at top-level; adds streaming_random factory method
tests/test_streaming_random_search.py Unit tests for streaming random search behavior and ModelSelector factory support
tests/test_streaming_brute_force.py Unit tests for streaming brute force update behavior
tests/streaming_brute_force_fake_data.py Runnable demo script for streaming brute force on synthetic data
docs/concepts/algorithms.md Adds “Streaming Random Search” to algorithms list and documents usage
docs/api/selectors.md Adds API doc entry for StreamingRandomSearchModelSelector
README.md Adds streaming_random method to selector table and an example for streaming brute force

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +264 to +265

if combo_unchanged and improvement < self._CONVERGENCE_DELTA:
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In _update_convergence_state, improvement = current_acc - self._best_accuracy and the stability check uses improvement < _CONVERGENCE_DELTA. If accuracy drops significantly (negative improvement), this still counts as “stable” and can trigger convergence even though performance changed materially. Consider using an absolute delta (abs(current_acc - self._best_accuracy) < ...) or tracking stability relative to a running best/EMA, so large regressions reset patience.

Suggested change
if combo_unchanged and improvement < self._CONVERGENCE_DELTA:
delta = abs(improvement)
if combo_unchanged and delta < self._CONVERGENCE_DELTA:

Copilot uses AI. Check for mistakes.
Comment on lines +278 to +283

if combo_unchanged and improvement < self._CONVERGENCE_DELTA:
self._stable_batches += 1
else:
self._stable_batches = 0

Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convergence logic uses improvement = current_acc - self._best_accuracy and treats improvement < _CONVERGENCE_DELTA as “stable”. Large negative changes (accuracy drops) will still be counted as stable and can trigger convergence prematurely. Using an absolute delta (abs(current_acc - self._best_accuracy) < ...) or another stability criterion would avoid converging through regressions.

Suggested change
if combo_unchanged and improvement < self._CONVERGENCE_DELTA:
self._stable_batches += 1
else:
self._stable_batches = 0
stable_accuracy = abs(improvement) < self._CONVERGENCE_DELTA
if combo_unchanged and stable_accuracy:
self._stable_batches += 1
else:
self._stable_batches = 0

Copilot uses AI. Check for mistakes.
Comment on lines +3 to 12
AgentOpt provides 9 selection algorithms. Choose based on your search space size and evaluation budget.

## At a Glance

| Algorithm | Strategy | Evaluations | Best For |
|:----------|:---------|:------------|:---------|
| [Brute Force](#brute-force) | Exhaustive | All | Small spaces (< 50 combos) |
| [Random Search](#random-search) | Sampling | Configurable fraction | Quick baselines |
| [Streaming Random Search](#streaming-random-search) | Streaming sampling | Incremental | Online / incoming batches |
| [Hill Climbing](#hill-climbing) | Greedy + restarts | Guided neighbors | Medium spaces |
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The guide now says “AgentOpt provides 9 selection algorithms”, but this PR introduces two new selectors (StreamingRandomSearchModelSelector and StreamingBruteForceModelSelector). Either document both streaming algorithms here (and update the count/table accordingly), or explicitly clarify that streaming brute-force is considered part of “Brute Force” rather than a separate algorithm (and add a note/section so users can discover it).

Copilot uses AI. Check for mistakes.
Comment on lines +95 to +101
if self._converged:
print(
"\nStreaming random search converged; skipping new batch. "
"Current best combo is stable."
)
return self.results()

Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update() returns early once _converged is set, preventing callers from continuing to incorporate new batches (even if drift occurs). Since should_continue() already exists, consider letting update() always process the batch (or provide an explicit override) and keep convergence as an advisory signal rather than a hard stop.

Copilot uses AI. Check for mistakes.
Comment on lines +20 to +26
def test_streaming_update_accumulates_and_tracks_best():
selector = StreamingBruteForceModelSelector(
agent_fn=_agent_fn,
models={"agent": ["good", "bad"]},
eval_fn=_eval_fn,
dataset=[({"x": 0}, 0)],
)
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test constructs StreamingBruteForceModelSelector(agent_fn=...), but the selector currently inherits BaseModelSelector whose constructor requires agent (and does not accept agent_fn). As written, instantiating the selector will raise a TypeError before these assertions run. Once the selector’s constructor is aligned with the base agent interface, update this test to match the supported API.

Copilot uses AI. Check for mistakes.
Comment on lines +69 to +77
def _run_selection(
self, parallel: bool = False, max_concurrent: int = 20,
) -> SelectionResults:
# Keep select_best behavior: evaluate the provided dataset once.
if not self._seed_consumed:
result = self.update(self.dataset, parallel=parallel, max_concurrent=max_concurrent)
self._seed_consumed = True
return result
return self.results()
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_run_selection() is used via BaseModelSelector.select_best(), which always calls self._tracker.stop() afterward. For streaming selectors, users will typically call select_best() once (warm start) and then call update() repeatedly; after the first select_best(), the tracker will be stopped and subsequent updates won’t record token usage / cache hits. Consider overriding select_best() in streaming selectors (or restarting the tracker at the start of update()) so streaming updates continue to be tracked.

Copilot uses AI. Check for mistakes.
Comment on lines +53 to +54
self._all_combos: List[Dict[str, ModelCandidate]] = self._all_combos()
total = len(self._all_combos)
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assigns self._all_combos to a list, which shadows the inherited BaseModelSelector._all_combos() method on the instance. That can break any later call to self._all_combos() (TypeError). Prefer a different attribute name (e.g., self._all_combos_list / self._combos).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +95 to +101
dataset=warm_start_dataset, # seed batch
sample_fraction=0.25,
seed=42,
)

selector.select_best() # evaluate warm start once
selector.update(stream_batch_1) # keep updating online
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example uses selector.select_best() and then calls selector.update(...). Note that select_best() (via BaseModelSelector) stops the tracker on return, so subsequent streaming updates won’t record token usage / cache hits unless the selector restarts the tracker. Either adjust the example to warm-start via update(warm_start_dataset) instead, or update the streaming selector implementation to keep tracking active across select_best() + update().

Suggested change
dataset=warm_start_dataset, # seed batch
sample_fraction=0.25,
seed=42,
)
selector.select_best() # evaluate warm start once
selector.update(stream_batch_1) # keep updating online
sample_fraction=0.25,
seed=42,
)
# warm-start with an initial batch
selector.update(warm_start_dataset)
# then keep updating online as new batches arrive
selector.update(stream_batch_1)

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

armaan-25 and others added 2 commits March 25, 2026 16:53
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants